Получение данных о смещении

Редактировал(а) Alexandr Fokin 2023/05/10 11:40

Задание:
Получение информации о текущем состоянии параметров:
  1. Общее кол-во сообщений в topic
  2. Общее кол-во прочитанных(commited) сообщений в topic (по определенной consumerGroup)
  3. Определение общего значения lag в topic(Разность между общей длиной и кол-во прочитанных сообщений) (по определенной consumerGroup)

Опционально по каждому partition.
Данные по topic можно определить по сумме совокупности данных по каждом partition, принадлежащему данному topic.

Решение:
Данные по общему кол-ву сообщений и прочитанным сообщениям можно получить через Consumer. Причем для указанного partition.
Данные по topic формируются через сумму данных всех partitions, входящих в него.
  //номер смещения последнего сообщения в очереди
 consumer
    .QueryWatermarkOffsets(TopicPartitionInfo(topicName, partitionId),ConnectionParamsEntity.ActionTimeout)
    .High

 //Получить активное смещение (последнее прочитанное сообщение)
 consumer
    .Committed(
     new TopicPartition[] { TopicPartitionInfo(topicName, partitionId) },
      ConnectionParamsEntity.ActionTimeout
    )
    .FirstOrDefault()
    .Offset
    .Value;
Теги: